// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeSet, BTreeMap, VecDeque};
use std::fmt::{Debug, Formatter, Error as FmtError};
use std::time;
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use ethkey::{Public, Secret};
use key_server_cluster::{Error, NodeId, SessionId, KeyStorage, DocumentKeyShare};
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::message::{Message, GenerationMessage, InitializeSession, ConfirmInitialization, CompleteInitialization,
	KeysDissemination, PublicKeyShare, SessionError, SessionCompleted};

/// Key generation session API.
pub trait Session: Send + Sync + 'static {
	/// Get generation session state.
	fn state(&self) -> SessionState;
	/// Wait until session is completed. Returns public portion of generated server key.
	fn wait(&self, timeout: Option<time::Duration>) -> Result<Public, Error>;
	/// Get joint public key (if it is known).
	fn joint_public_and_secret(&self) -> Option<Result<(Public, Secret), Error>>;
}

/// Distributed key generation session.
/// Based on "ECDKG: A Distributed Key Generation Protocol Based on Elliptic Curve Discrete Logarithm" paper:
/// http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.124.4128&rep=rep1&type=pdf
/// Brief overview:
/// 1) initialization: master node (which has received request for generating joint public + secret) initializes the session on all other nodes
/// 2) key dissemination (KD): all nodes are generating secret + public values and send these to appropriate nodes
/// 3) key verification (KV): all nodes are checking values, received for other nodes
/// 4) key generation phase (KG): nodes are exchanging with information, enough to generate joint public key
pub struct SessionImpl {
	/// Unique session id.
	id: SessionId,
	/// Public identifier of this node.
	self_node_id: NodeId,
	/// Key storage.
	key_storage: Option<Arc<KeyStorage>>,
	/// Cluster which allows this node to send messages to other nodes in the cluster.
	cluster: Arc<Cluster>,
	/// SessionImpl completion condvar.
	completed: Condvar,
	/// Mutable session data.
	data: Mutex<SessionData>,
}

/// SessionImpl creation parameters
pub struct SessionParams {
	/// SessionImpl identifier.
	pub id: SessionId,
	/// Id of node, on which this session is running.
	pub self_node_id: Public,
	/// Key storage.
	pub key_storage: Option<Arc<KeyStorage>>,
	/// Cluster
	pub cluster: Arc<Cluster>,
}

#[derive(Debug)]
/// Mutable data of distributed key generation session.
struct SessionData {
	/// Current state of the session.
	state: SessionState,
	/// Simulate faulty behaviour?
	simulate_faulty_behaviour: bool,

	// === Values, filled when session initialization just starts ===
	/// Reference to the node, which has started this session.
	master: Option<NodeId>,
	/// Public key of the creator of the session.
	author: Option<Public>,

	// === Values, filled when session initialization is completed ===
	/// Threshold value for this DKG. Only `threshold + 1` will be able to collectively recreate joint secret,
	/// and thus - decrypt message, encrypted with joint public.
	threshold: Option<usize>,
	/// Random point, jointly generated by every node in the cluster.
	derived_point: Option<Public>,
	/// Nodes-specific data.
	nodes: BTreeMap<NodeId, NodeData>,

	// === Values, filled during KD phase ===
	/// Value of polynom1[0], generated by this node.
	secret_coeff: Option<Secret>,

	// === Values, filled during KG phase ===
	/// Secret share, which this node holds. Persistent + private.
	secret_share: Option<Secret>,

	/// === Values, filled when DKG session is completed successfully ===
	/// Key share.
	key_share: Option<Result<DocumentKeyShare, Error>>,
	/// Jointly generated public key, which can be used to encrypt secret. Public.
	joint_public_and_secret: Option<Result<(Public, Secret), Error>>,
}

#[derive(Debug, Clone)]
/// Mutable node-specific data.
struct NodeData {
	/// Random unique scalar. Persistent.
	pub id_number: Secret,

	// === Values, filled during KD phase ===
	/// Secret value1, which has been sent to this node.
	pub secret1_sent: Option<Secret>,
	/// Secret value2, which has been sent to this node.
	pub secret2_sent: Option<Secret>,
	/// Secret value1, which has been received from this node.
	pub secret1: Option<Secret>,
	/// Secret value2, which has been received from this node.
	pub secret2: Option<Secret>,
	/// Public values, which have been received from this node.
	pub publics: Option<Vec<Public>>,

	// === Values, filled during KG phase ===
	/// Public share, which has been received from this node.
	pub public_share: Option<Public>,

	// === Values, filled during completion phase ===
	/// Flags marking that node has confirmed session completion (generated key is stored).
	pub completion_confirmed: bool,
}

#[derive(Debug, Clone, PartialEq)]
/// Schedule for visiting other nodes of cluster.
pub struct EveryOtherNodeVisitor {
	/// Already visited nodes.
	visited: BTreeSet<NodeId>,
	/// Not yet visited nodes.
	unvisited: VecDeque<NodeId>,
	/// Nodes, which are currently visited.
	in_progress: BTreeSet<NodeId>,
}

#[derive(Debug, Clone, PartialEq)]
/// Distributed key generation session state.
pub enum SessionState {
	// === Initialization states ===
	/// Every node starts in this state.
	WaitingForInitialization,
	/// Master node asks every other node to confirm initialization.
	/// Derived point is generated by all nodes in the cluster.
	WaitingForInitializationConfirm(EveryOtherNodeVisitor),
	/// Slave nodes are in this state until initialization completion is reported by master node.
	WaitingForInitializationComplete,

	// === KD phase states ===
	/// Node is waiting for generated keys from every other node.
	WaitingForKeysDissemination,

	// === KG phase states ===
	/// Node is waiting for joint public key share to be received from every other node.
	WaitingForPublicKeyShare,

	// === Generation phase states ===
	/// Node is waiting for session completion/session completion confirmation.
	WaitingForGenerationConfirmation,

	// === Final states of the session ===
	/// Joint public key generation is completed.
	Finished,
	/// Joint public key generation is failed.
	Failed,
}

impl SessionImpl {
	/// Create new generation session.
	pub fn new(params: SessionParams) -> Self {
		SessionImpl {
			id: params.id,
			self_node_id: params.self_node_id,
			key_storage: params.key_storage,
			cluster: params.cluster,
			completed: Condvar::new(),
			data: Mutex::new(SessionData {
				state: SessionState::WaitingForInitialization,
				simulate_faulty_behaviour: false,
				master: None,
				author: None,
				threshold: None,
				derived_point: None,
				nodes: BTreeMap::new(),
				secret_coeff: None,
				secret_share: None,
				key_share: None,
				joint_public_and_secret: None,
			}),
		}
	}

	/// Get this node Id.
	pub fn node(&self) -> &NodeId {
		&self.self_node_id
	}

	#[cfg(test)]
	/// Get derived point.
	pub fn derived_point(&self) -> Option<Public> {
		self.data.lock().derived_point.clone()
	}

	/// Simulate faulty generation session behaviour.
	pub fn simulate_faulty_behaviour(&self) {
		self.data.lock().simulate_faulty_behaviour = true;
	}

	/// Start new session initialization. This must be called on master node.
	pub fn initialize(&self, author: Public, threshold: usize, nodes: BTreeSet<NodeId>) -> Result<(), Error> {
		check_cluster_nodes(self.node(), &nodes)?;
		check_threshold(threshold, &nodes)?;

		let mut data = self.data.lock();

		// check state
		if data.state != SessionState::WaitingForInitialization {
			return Err(Error::InvalidStateForRequest);
		}

		// update state
		data.master = Some(self.node().clone());
		data.author = Some(author.clone());
		data.threshold = Some(threshold);
		for node_id in &nodes {
			// generate node identification parameter
			let node_id_number = math::generate_random_scalar()?;
			data.nodes.insert(node_id.clone(), NodeData::with_id_number(node_id_number));
		}

		let mut visit_policy = EveryOtherNodeVisitor::new(self.node(), data.nodes.keys().cloned());
		let derived_point = math::generate_random_point()?;
		match visit_policy.next_node() {
			Some(next_node) => {
				data.state = SessionState::WaitingForInitializationConfirm(visit_policy);

				// start initialization
				self.cluster.send(&next_node, Message::Generation(GenerationMessage::InitializeSession(InitializeSession {
						session: self.id.clone().into(),
						author: author.into(),
						nodes: data.nodes.iter().map(|(k, v)| (k.clone().into(), v.id_number.clone().into())).collect(),
						threshold: data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"),
						derived_point: derived_point.into(),
					})))
			},
			None => {
				drop(data);
				self.complete_initialization(derived_point)?;
				self.disseminate_keys()?;
				self.verify_keys()?;
				self.complete_generation()
			}
		}
	}

	/// Process single message.
	pub fn process_message(&self, sender: &NodeId, message: &GenerationMessage) -> Result<(), Error> {
		match message {
			&GenerationMessage::InitializeSession(ref message) =>
				self.on_initialize_session(sender.clone(), message),
			&GenerationMessage::ConfirmInitialization(ref message) =>
				self.on_confirm_initialization(sender.clone(), message),
			&GenerationMessage::CompleteInitialization(ref message) =>
				self.on_complete_initialization(sender.clone(), message),
			&GenerationMessage::KeysDissemination(ref message) =>
				self.on_keys_dissemination(sender.clone(), message),
			&GenerationMessage::PublicKeyShare(ref message) =>
				self.on_public_key_share(sender.clone(), message),
			&GenerationMessage::SessionError(ref message) =>
				self.on_session_error(sender.clone(), message),
			&GenerationMessage::SessionCompleted(ref message) => 
				self.on_session_completed(sender.clone(), message),
		}
	}

	/// When session initialization message is received.
	pub fn on_initialize_session(&self, sender: NodeId, message: &InitializeSession) -> Result<(), Error> {
		debug_assert!(self.id == *message.session);
		debug_assert!(&sender != self.node());

		// check message
		let nodes_ids = message.nodes.keys().cloned().map(Into::into).collect();
		check_threshold(message.threshold, &nodes_ids)?;
		check_cluster_nodes(self.node(), &nodes_ids)?;

		let mut data = self.data.lock();

		// check state
		if data.state != SessionState::WaitingForInitialization {
			return Err(Error::InvalidStateForRequest);
		}

		// update derived point with random scalar
		let mut derived_point = message.derived_point.clone().into();
		math::update_random_point(&mut derived_point)?;

		// send confirmation back to master node
		self.cluster.send(&sender, Message::Generation(GenerationMessage::ConfirmInitialization(ConfirmInitialization {
			session: self.id.clone().into(),
			derived_point: derived_point.into(),
		})))?;

		// update state
		data.master = Some(sender);
		data.author = Some(message.author.clone().into());
		data.state = SessionState::WaitingForInitializationComplete;
		data.nodes = message.nodes.iter().map(|(id, number)| (id.clone().into(), NodeData::with_id_number(number.clone().into()))).collect();
		data.threshold = Some(message.threshold);

		Ok(())
	}

	/// When session initialization confirmation message is reeived.
	pub fn on_confirm_initialization(&self, sender: NodeId, message: &ConfirmInitialization) -> Result<(), Error> {
		debug_assert!(self.id == *message.session);
		debug_assert!(&sender != self.node());

		let mut data = self.data.lock();
		debug_assert!(data.nodes.contains_key(&sender));

		// check state && select new node to be initialized
		let next_receiver = match data.state {
			SessionState::WaitingForInitializationConfirm(ref mut visit_policy) => {
				if !visit_policy.mark_visited(&sender) {
					return Err(Error::InvalidStateForRequest);
				}

				visit_policy.next_node()
			},
			_ => return Err(Error::InvalidStateForRequest),
		};

		// proceed message
		if let Some(next_receiver) = next_receiver {
			return self.cluster.send(&next_receiver, Message::Generation(GenerationMessage::InitializeSession(InitializeSession {
					session: self.id.clone().into(),
					author: data.author.as_ref().expect("author is filled on initialization step; confrm initialization follows initialization; qed").clone().into(),
					nodes: data.nodes.iter().map(|(k, v)| (k.clone().into(), v.id_number.clone().into())).collect(),
					threshold: data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed"),
					derived_point: message.derived_point.clone().into(),
				})));
		}

		// now it is time for keys dissemination (KD) phase
		drop(data);
		self.complete_initialization(message.derived_point.clone().into())?;
		self.disseminate_keys()
	}

	/// When session initialization completion message is received.
	pub fn on_complete_initialization(&self, sender: NodeId, message: &CompleteInitialization) -> Result<(), Error> {
		debug_assert!(self.id == *message.session);
		debug_assert!(&sender != self.node());

		let mut data = self.data.lock();

		// check state
		if data.state != SessionState::WaitingForInitializationComplete {
			return Err(Error::InvalidStateForRequest);
		}
		if data.master != Some(sender) {
			return Err(Error::InvalidMessage);
		}

		// remember passed data
		data.derived_point = Some(message.derived_point.clone().into());

		// now it is time for keys dissemination (KD) phase
		drop(data);
		self.disseminate_keys()
	}

	/// When keys dissemination message is received.
	pub fn on_keys_dissemination(&self, sender: NodeId, message: &KeysDissemination) -> Result<(), Error> {
		debug_assert!(self.id == *message.session);
		debug_assert!(&sender != self.node());

		let mut data = self.data.lock();

		// simulate failure, if required
		if data.simulate_faulty_behaviour {
			return Err(Error::Io("simulated error".into()));
		}

		// check state
		if data.state != SessionState::WaitingForKeysDissemination {
			match data.state {
				SessionState::WaitingForInitializationComplete => return Err(Error::TooEarlyForRequest),
				_ => return Err(Error::InvalidStateForRequest),
			}
		}
		debug_assert!(data.nodes.contains_key(&sender));

		// check message
		let threshold = data.threshold.expect("threshold is filled in initialization phase; KD phase follows initialization phase; qed");
		if message.publics.len() != threshold + 1 {
			return Err(Error::InvalidMessage);
		}

		// update node data
		{
			let node_data = data.nodes.get_mut(&sender).ok_or(Error::InvalidMessage)?;
			if node_data.secret1.is_some() || node_data.secret2.is_some() || node_data.publics.is_some() {
				return Err(Error::InvalidStateForRequest);
			}

			node_data.secret1 = Some(message.secret1.clone().into());
			node_data.secret2 = Some(message.secret2.clone().into());
			node_data.publics = Some(message.publics.iter().cloned().map(Into::into).collect());
		}

		// check if we have received keys from every other node
		if data.nodes.iter().any(|(node_id, node_data)| node_id != self.node() && (node_data.publics.is_none() || node_data.secret1.is_none() || node_data.secret2.is_none())) {
			return Ok(())
		}

		drop(data);
		self.verify_keys()
	}

	/// When public key share is received.
	pub fn on_public_key_share(&self, sender: NodeId, message: &PublicKeyShare) -> Result<(), Error> {
		let mut data = self.data.lock();

		// check state
		if data.state != SessionState::WaitingForPublicKeyShare {
			match data.state {
				SessionState::WaitingForInitializationComplete |
					SessionState::WaitingForKeysDissemination => return Err(Error::TooEarlyForRequest),
				_ => return Err(Error::InvalidStateForRequest),
			}
		}

		// update node data with received public share
		{
			let node_data = &mut data.nodes.get_mut(&sender).ok_or(Error::InvalidMessage)?;
				if node_data.public_share.is_some() {
				return Err(Error::InvalidMessage);
			}

			node_data.public_share = Some(message.public_share.clone().into());
		}

		// if there's also nodes, which has not sent us their public shares - do nothing
		if data.nodes.iter().any(|(node_id, node_data)| node_id != self.node() && node_data.public_share.is_none()) {
			return Ok(());
		}

		drop(data);
		self.complete_generation()
	}

	/// When session completion message is received.
	pub fn on_session_completed(&self, sender: NodeId, message: &SessionCompleted) -> Result<(), Error> {
		debug_assert!(self.id == *message.session);
		debug_assert!(&sender != self.node());

		let mut data = self.data.lock();
		debug_assert!(data.nodes.contains_key(&sender));

		// check state
		if data.state != SessionState::WaitingForGenerationConfirmation {
			match data.state {
				SessionState::WaitingForPublicKeyShare => return Err(Error::TooEarlyForRequest),
				_ => return Err(Error::InvalidStateForRequest),
			}
		}

		// if we are not masters, save result and respond with confirmation
		if data.master.as_ref() != Some(self.node()) {
			// check that we have received message from master
			if data.master.as_ref() != Some(&sender) {
				return Err(Error::InvalidMessage);
			}

			// save encrypted data to key storage
			let encrypted_data = DocumentKeyShare {
				author: data.author.as_ref().expect("author is filled in initialization phase; KG phase follows initialization phase; qed").clone(),
				threshold: data.threshold.expect("threshold is filled in initialization phase; KG phase follows initialization phase; qed"),
				id_numbers: data.nodes.iter().map(|(node_id, node_data)| (node_id.clone(), node_data.id_number.clone())).collect(),
				secret_share: data.secret_share.as_ref().expect("secret_share is filled in KG phase; we are at the end of KG phase; qed").clone(),
				common_point: None,
				encrypted_point: None,
			};
			
			if let Some(ref key_storage) = self.key_storage {
				key_storage.insert(self.id.clone(), encrypted_data.clone())
					.map_err(|e| Error::KeyStorage(e.into()))?;
			}

			// then respond with confirmation
			data.state = SessionState::Finished;
			return self.cluster.send(&sender, Message::Generation(GenerationMessage::SessionCompleted(SessionCompleted {
				session: self.id.clone().into(),
			})));
		}

		// remember that we have received confirmation from sender node
		{
			let sender_node = data.nodes.get_mut(&sender).expect("node is always qualified by himself; qed");
			if sender_node.completion_confirmed {
				return Err(Error::InvalidMessage);
			}

			sender_node.completion_confirmed = true;
		}

		// check if we have received confirmations from all cluster nodes
		if data.nodes.iter().any(|(_, node_data)| !node_data.completion_confirmed) {
			return Ok(())
		}

		// we have received enough confirmations => complete session
		data.state = SessionState::Finished;
		self.completed.notify_all();

		Ok(())
	}

	/// When error has occured on another node.
	pub fn on_session_error(&self, sender: NodeId, message: &SessionError) -> Result<(), Error> {
		let mut data = self.data.lock();

		warn!("{}: generation session failed with error: {} from {}", self.node(), message.error, sender);

		data.state = SessionState::Failed;
		data.key_share = Some(Err(Error::Io(message.error.clone())));
		data.joint_public_and_secret = Some(Err(Error::Io(message.error.clone())));
		self.completed.notify_all();

		Ok(())
	}

	/// Complete initialization (when all other nodex has responded with confirmation)
	fn complete_initialization(&self, mut derived_point: Public) -> Result<(), Error> {
		// update point once again to make sure that derived point is not generated by last node
		math::update_random_point(&mut derived_point)?;

		// remember derived point
		let mut data = self.data.lock();
		data.derived_point = Some(derived_point.clone().into());

		// broadcast derived point && other session paraeters to every other node
		self.cluster.broadcast(Message::Generation(GenerationMessage::CompleteInitialization(CompleteInitialization {
			session: self.id.clone().into(),
			derived_point: derived_point.into(),
		})))
	}

	/// Keys dissemination (KD) phase
	fn disseminate_keys(&self) -> Result<(), Error> {
		let mut data = self.data.lock();

		// pick 2t + 2 random numbers as polynomial coefficients for 2 polynoms
		let threshold = data.threshold.expect("threshold is filled on initialization phase; KD phase follows initialization phase; qed");
		let polynom1 = math::generate_random_polynom(threshold)?;
		let polynom2 = math::generate_random_polynom(threshold)?;
		data.secret_coeff = Some(polynom1[0].clone());
		
		// compute t+1 public values
		let publics = math::public_values_generation(threshold,
			data.derived_point.as_ref().expect("keys dissemination occurs after derived point is agreed; qed"),
			&polynom1,
			&polynom2)?;

		// compute secret values for every other node
		for (node, node_data) in data.nodes.iter_mut() {
			let secret1 = math::compute_polynom(&polynom1, &node_data.id_number)?;
			let secret2 = math::compute_polynom(&polynom2, &node_data.id_number)?;

			// send a message containing secret1 && secret2 to other node
			if node != self.node() {
				node_data.secret1_sent = Some(secret1.clone());
				node_data.secret2_sent = Some(secret2.clone());

				self.cluster.send(&node, Message::Generation(GenerationMessage::KeysDissemination(KeysDissemination {
					session: self.id.clone().into(),
					secret1: secret1.into(),
					secret2: secret2.into(),
					publics: publics.iter().cloned().map(Into::into).collect(),
				})))?;
			} else {
				node_data.secret1 = Some(secret1);
				node_data.secret2 = Some(secret2);
				node_data.publics = Some(publics.clone());
			}
		}

		// update state
		data.state = SessionState::WaitingForKeysDissemination;

		Ok(())
	}

	/// Keys verification (KV) phase
	fn verify_keys(&self) -> Result<(), Error> {
		let mut data = self.data.lock();
		
		// key verification (KV) phase: check that other nodes have passed correct secrets
		let threshold = data.threshold.expect("threshold is filled in initialization phase; KV phase follows initialization phase; qed");
		let derived_point = data.derived_point.clone().expect("derived point generated on initialization phase; KV phase follows initialization phase; qed");
		let number_id = data.nodes[self.node()].id_number.clone();
		for (_	, node_data) in data.nodes.iter_mut().filter(|&(node_id, _)| node_id != self.node()) {
			let secret1 = node_data.secret1.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
			let secret2 = node_data.secret2.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
			let publics = node_data.publics.as_ref().expect("keys received on KD phase; KV phase follows KD phase; qed");
			let is_key_verification_ok = math::keys_verification(threshold, &derived_point, &number_id,
				secret1, secret2, publics)?;

			if !is_key_verification_ok {
				// node has sent us incorrect values. In original ECDKG protocol we should have sent complaint here.
				return Err(Error::InvalidMessage);
			}
		}

		// calculate public share
		let self_public_share = {
			let self_secret_coeff = data.secret_coeff.as_ref().expect("secret_coeff is generated on KD phase; KG phase follows KD phase; qed");
			math::compute_public_share(self_secret_coeff)?
		};

		// calculate self secret + public shares
		let self_secret_share = {
			let secret_values_iter = data.nodes.values()
				.map(|n| n.secret1.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed"));
			math::compute_secret_share(secret_values_iter)?
		};

		// update state
		data.state = SessionState::WaitingForPublicKeyShare;
		data.secret_share = Some(self_secret_share);
		let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed");
		self_node.public_share = Some(self_public_share.clone());

		// broadcast self public key share
		self.cluster.broadcast(Message::Generation(GenerationMessage::PublicKeyShare(PublicKeyShare {
			session: self.id.clone().into(),
			public_share: self_public_share.into(),
		})))
	}

	/// Complete generation
	fn complete_generation(&self) -> Result<(), Error> {
		let mut data = self.data.lock();
		
		// else - calculate joint public key
		let joint_public = {
			let public_shares = data.nodes.values().map(|n| n.public_share.as_ref().expect("keys received on KD phase; KG phase follows KD phase; qed"));
			math::compute_joint_public(public_shares)?
		};

		// prepare key data
		let encrypted_data = DocumentKeyShare {
			author: data.author.as_ref().expect("author is filled in initialization phase; KG phase follows initialization phase; qed").clone(),
			threshold: data.threshold.expect("threshold is filled in initialization phase; KG phase follows initialization phase; qed"),
			id_numbers: data.nodes.iter().map(|(node_id, node_data)| (node_id.clone(), node_data.id_number.clone())).collect(),
			secret_share: data.secret_share.as_ref().expect("secret_share is filled in KG phase; we are at the end of KG phase; qed").clone(),
			common_point: None,
			encrypted_point: None,
		};

		// if we are at the slave node - wait for session completion
		let secret_coeff = data.secret_coeff.as_ref().expect("secret coeff is selected on initialization phase; current phase follows initialization; qed").clone();
		if data.master.as_ref() != Some(self.node()) {
			data.key_share = Some(Ok(encrypted_data));
			data.joint_public_and_secret = Some(Ok((joint_public, secret_coeff)));
			data.state = SessionState::WaitingForGenerationConfirmation;
			return Ok(());
		}

		// then save encrypted data to the key storage
		if let Some(ref key_storage) = self.key_storage {
			key_storage.insert(self.id.clone(), encrypted_data.clone())
				.map_err(|e| Error::KeyStorage(e.into()))?;
		}

		// then distribute encrypted data to every other node
		self.cluster.broadcast(Message::Generation(GenerationMessage::SessionCompleted(SessionCompleted {
			session: self.id.clone().into(),
		})))?;

		// then wait for confirmation from all other nodes
		{
			let self_node = data.nodes.get_mut(self.node()).expect("node is always qualified by himself; qed");
			self_node.completion_confirmed = true;
		}
		data.key_share = Some(Ok(encrypted_data));
		data.joint_public_and_secret = Some(Ok((joint_public, secret_coeff)));
		data.state = SessionState::WaitingForGenerationConfirmation;

		Ok(())
	}
}

impl ClusterSession for SessionImpl {
	fn is_finished(&self) -> bool {
		let data = self.data.lock();
		data.state == SessionState::Failed
			|| data.state == SessionState::Finished
	}

	fn on_node_timeout(&self, node: &NodeId) {
		let mut data = self.data.lock();

		// all nodes are required for generation session
		// => fail without check
		warn!("{}: generation session failed because {} connection has timeouted", self.node(), node);

		data.state = SessionState::Failed;
		data.key_share = Some(Err(Error::NodeDisconnected));
		data.joint_public_and_secret = Some(Err(Error::NodeDisconnected));
		self.completed.notify_all();
	}

	fn on_session_timeout(&self) {
		let mut data = self.data.lock();

		warn!("{}: generation session failed with timeout", self.node());

		data.state = SessionState::Failed;
		data.key_share = Some(Err(Error::NodeDisconnected));
		data.joint_public_and_secret = Some(Err(Error::NodeDisconnected));
		self.completed.notify_all();
	}
}

impl Session for SessionImpl {
	fn state(&self) -> SessionState {
		self.data.lock().state.clone()
	}

	fn wait(&self, timeout: Option<time::Duration>) -> Result<Public, Error> {
		let mut data = self.data.lock();
		if !data.joint_public_and_secret.is_some() {
			match timeout {
				None => self.completed.wait(&mut data),
				Some(timeout) => { self.completed.wait_for(&mut data, timeout); },
			}
		}

		data.joint_public_and_secret.clone()
			.expect("checked above or waited for completed; completed is only signaled when joint_public.is_some(); qed")
			.map(|p| p.0)
	}

	fn joint_public_and_secret(&self) -> Option<Result<(Public, Secret), Error>> {
		self.data.lock().joint_public_and_secret.clone()
	}
}

impl EveryOtherNodeVisitor {
	pub fn new<I>(self_id: &NodeId, nodes: I) -> Self where I: Iterator<Item=NodeId> {
		EveryOtherNodeVisitor {
			visited: BTreeSet::new(),
			unvisited: nodes.filter(|n| n != self_id).collect(),
			in_progress: BTreeSet::new(),
		}
	}

	pub fn next_node(&mut self) -> Option<NodeId> {
		let next_node = self.unvisited.pop_front();
		if let Some(ref next_node) = next_node {
			self.in_progress.insert(next_node.clone());
		}
		next_node
	}

	pub fn mark_visited(&mut self, node: &NodeId) -> bool {
		if !self.in_progress.remove(node) {
			return false;
		}
		self.visited.insert(node.clone())
	}
}

impl NodeData {
	fn with_id_number(node_id_number: Secret) -> Self {
		NodeData {
			id_number: node_id_number,
			secret1_sent: None,
			secret2_sent: None,
			secret1: None,
			secret2: None,
			publics: None,
			public_share: None,
			completion_confirmed: false,
		}
	}
}

impl Debug for SessionImpl {
	fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
		write!(f, "Generation session {} on {}", self.id, self.self_node_id)
	}
}

pub fn check_cluster_nodes(self_node_id: &NodeId, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
	// at least two nodes must be in cluster
	if nodes.len() < 1 {
		return Err(Error::InvalidNodesCount);
	}
	// this node must be a part of cluster
	if !nodes.contains(self_node_id) {
		return Err(Error::InvalidNodesConfiguration);
	}

	Ok(())
}

pub fn check_threshold(threshold: usize, nodes: &BTreeSet<NodeId>) -> Result<(), Error> {
	// at least threshold + 1 nodes are required to collectively decrypt message
	if threshold >= nodes.len() {
		return Err(Error::InvalidThreshold);
	}

	Ok(())
}

#[cfg(test)]
pub mod tests {
	use std::time;
	use std::sync::Arc;
	use std::collections::{BTreeSet, BTreeMap, VecDeque};
	use tokio_core::reactor::Core;
	use ethkey::{Random, Generator, Public};
	use key_server_cluster::{NodeId, SessionId, Error, DummyKeyStorage};
	use key_server_cluster::message::{self, Message, GenerationMessage};
	use key_server_cluster::cluster::tests::{DummyCluster, make_clusters, run_clusters, loop_until, all_connections_established};
	use key_server_cluster::cluster_sessions::ClusterSession;
	use key_server_cluster::generation_session::{Session, SessionImpl, SessionState, SessionParams};
	use key_server_cluster::math;
	use key_server_cluster::math::tests::do_encryption_and_decryption;

	pub struct Node {
		pub cluster: Arc<DummyCluster>,
		pub key_storage: Arc<DummyKeyStorage>,
		pub session: SessionImpl,
	}

	pub struct MessageLoop {
		pub session_id: SessionId,
		pub nodes: BTreeMap<NodeId, Node>,
		pub queue: VecDeque<(NodeId, NodeId, Message)>,
	}

	impl MessageLoop {
		pub fn new(nodes_num: usize) -> Self {
			let mut nodes = BTreeMap::new();
			let session_id = SessionId::default();
			for _ in 0..nodes_num {
				let key_pair = Random.generate().unwrap();
				let node_id = key_pair.public().clone();
				let cluster = Arc::new(DummyCluster::new(node_id.clone()));
				let key_storage = Arc::new(DummyKeyStorage::default());
				let session = SessionImpl::new(SessionParams {
					id: session_id.clone(),
					self_node_id: node_id.clone(),
					key_storage: Some(key_storage.clone()),
					cluster: cluster.clone(),
				});
				nodes.insert(node_id, Node { cluster: cluster, key_storage: key_storage, session: session });
			}

			let nodes_ids: Vec<_> = nodes.keys().cloned().collect();
			for node in nodes.values() {
				for node_id in &nodes_ids {
					node.cluster.add_node(node_id.clone());
				}
			}

			MessageLoop {
				session_id: session_id,
				nodes: nodes,
				queue: VecDeque::new(),
			}
		}

		pub fn master(&self) -> &SessionImpl {
			&self.nodes.values().nth(0).unwrap().session
		}

		pub fn first_slave(&self) -> &SessionImpl {
			&self.nodes.values().nth(1).unwrap().session
		}

		pub fn second_slave(&self) -> &SessionImpl {
			&self.nodes.values().nth(2).unwrap().session
		}

		pub fn take_message(&mut self) -> Option<(NodeId, NodeId, Message)> {
			self.nodes.values()
				.filter_map(|n| n.cluster.take_message().map(|m| (n.session.node().clone(), m.0, m.1)))
				.nth(0)
				.or_else(|| self.queue.pop_front())
		}

		pub fn process_message(&mut self, msg: (NodeId, NodeId, Message)) -> Result<(), Error> {
			match {
				match msg.2 {
					Message::Generation(GenerationMessage::InitializeSession(ref message)) => self.nodes[&msg.1].session.on_initialize_session(msg.0.clone(), &message),
					Message::Generation(GenerationMessage::ConfirmInitialization(ref message)) => self.nodes[&msg.1].session.on_confirm_initialization(msg.0.clone(), &message),
					Message::Generation(GenerationMessage::CompleteInitialization(ref message)) => self.nodes[&msg.1].session.on_complete_initialization(msg.0.clone(), &message),
					Message::Generation(GenerationMessage::KeysDissemination(ref message)) => self.nodes[&msg.1].session.on_keys_dissemination(msg.0.clone(), &message),
					Message::Generation(GenerationMessage::PublicKeyShare(ref message)) => self.nodes[&msg.1].session.on_public_key_share(msg.0.clone(), &message),
					Message::Generation(GenerationMessage::SessionCompleted(ref message)) => self.nodes[&msg.1].session.on_session_completed(msg.0.clone(), &message),
					_ => panic!("unexpected"),
				}
			} {
				Ok(_) => Ok(()),
				Err(Error::TooEarlyForRequest) => {
					self.queue.push_back(msg);
					Ok(())
				},
				Err(err) => Err(err),
			}
		}

		pub fn take_and_process_message(&mut self) -> Result<(), Error> {
			let msg = self.take_message().unwrap();
			self.process_message(msg)
		}
	}

	fn make_simple_cluster(threshold: usize, num_nodes: usize) -> Result<(SessionId, NodeId, NodeId, MessageLoop), Error> {
		let l = MessageLoop::new(num_nodes);
		l.master().initialize(Public::default(), threshold, l.nodes.keys().cloned().collect())?;

		let session_id = l.session_id.clone();
		let master_id = l.master().node().clone();
		let slave_id = l.first_slave().node().clone();
		Ok((session_id, master_id, slave_id, l))
	}

	#[test]
	fn initializes_in_cluster_of_single_node() {
		let l = MessageLoop::new(1);
		assert!(l.master().initialize(Public::default(), 0, l.nodes.keys().cloned().collect()).is_ok());
	}

	#[test]
	fn fails_to_initialize_if_not_a_part_of_cluster() {
		let node_id = math::generate_random_point().unwrap();
		let cluster = Arc::new(DummyCluster::new(node_id.clone()));
		let session = SessionImpl::new(SessionParams {
			id: SessionId::default(),
			self_node_id: node_id.clone(),
			key_storage: Some(Arc::new(DummyKeyStorage::default())),
			cluster: cluster,
		});
		let cluster_nodes: BTreeSet<_> = (0..2).map(|_| math::generate_random_point().unwrap()).collect();
		assert_eq!(session.initialize(Public::default(), 0, cluster_nodes).unwrap_err(), Error::InvalidNodesConfiguration);
	}

	#[test]
	fn fails_to_initialize_if_threshold_is_wrong() {
		match make_simple_cluster(2, 2) {
			Err(Error::InvalidThreshold) => (),
			_ => panic!("unexpected"),
		}
	}

	#[test]
	fn fails_to_initialize_when_already_initialized() {
		let (_, _, _, l) = make_simple_cluster(0, 2).unwrap();
		assert_eq!(l.master().initialize(Public::default(), 0, l.nodes.keys().cloned().collect()).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn fails_to_accept_initialization_when_already_initialized() {
		let (_, _, _, mut l) = make_simple_cluster(0, 2).unwrap();
		let message = l.take_message().unwrap();
		l.process_message(message.clone()).unwrap();
		assert_eq!(l.process_message(message.clone()).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn slave_updates_derived_point_on_initialization() {
		let (_, _, _, mut l) = make_simple_cluster(0, 2).unwrap();
		let passed_point = match l.take_message().unwrap() {
			(f, t, Message::Generation(GenerationMessage::InitializeSession(message))) => {
				let point = message.derived_point.clone();
				l.process_message((f, t, Message::Generation(GenerationMessage::InitializeSession(message)))).unwrap();
				point
			},
			_ => panic!("unexpected"),
		};

		match l.take_message().unwrap() {
			(_, _, Message::Generation(GenerationMessage::ConfirmInitialization(message))) => assert!(passed_point != message.derived_point),
			_ => panic!("unexpected"),
		}
	}

	#[test]
	fn fails_to_accept_initialization_confirmation_if_already_accepted_from_the_same_node() {
		let (sid, _, s, mut l) = make_simple_cluster(0, 3).unwrap();
		l.take_and_process_message().unwrap();
		l.take_and_process_message().unwrap();
		l.take_and_process_message().unwrap();
		assert_eq!(l.master().on_confirm_initialization(s, &message::ConfirmInitialization {
			session: sid.into(),
			derived_point: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn fails_to_accept_initialization_confirmation_if_initialization_already_completed() {
		let (sid, _, s, mut l) = make_simple_cluster(0, 2).unwrap();
		l.take_and_process_message().unwrap();
		l.take_and_process_message().unwrap();
		assert_eq!(l.master().on_confirm_initialization(s, &message::ConfirmInitialization {
			session: sid.into(),
			derived_point: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn master_updates_derived_point_on_initialization_completion() {
		let (_, _, _, mut l) = make_simple_cluster(0, 2).unwrap();
		l.take_and_process_message().unwrap();
		let passed_point = match l.take_message().unwrap() {
			(f, t, Message::Generation(GenerationMessage::ConfirmInitialization(message))) => {
				let point = message.derived_point.clone();
				l.process_message((f, t, Message::Generation(GenerationMessage::ConfirmInitialization(message)))).unwrap();
				point
			},
			_ => panic!("unexpected"),
		};

		assert!(l.master().derived_point().unwrap() != passed_point.into());
	}

	#[test]
	fn fails_to_complete_initialization_if_not_a_part_of_cluster() {
		let (sid, m, _, l) = make_simple_cluster(0, 2).unwrap();
		let mut nodes = BTreeMap::new();
		nodes.insert(m, math::generate_random_scalar().unwrap());
		nodes.insert(math::generate_random_point().unwrap(), math::generate_random_scalar().unwrap());
		assert_eq!(l.first_slave().on_initialize_session(m, &message::InitializeSession {
			session: sid.into(),
			author: Public::default().into(),
			nodes: nodes.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
			threshold: 0,
			derived_point: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidNodesConfiguration);
	}

	#[test]
	fn fails_to_complete_initialization_if_threshold_is_wrong() {
		let (sid, m, s, l) = make_simple_cluster(0, 2).unwrap();
		let mut nodes = BTreeMap::new();
		nodes.insert(m, math::generate_random_scalar().unwrap());
		nodes.insert(s, math::generate_random_scalar().unwrap());
		assert_eq!(l.first_slave().on_initialize_session(m, &message::InitializeSession {
			session: sid.into(),
			author: Public::default().into(),
			nodes: nodes.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
			threshold: 2,
			derived_point: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidThreshold);
	}

	#[test]
	fn fails_to_complete_initialization_if_not_waiting_for_it() {
		let (sid, m, _, l) = make_simple_cluster(0, 2).unwrap();
		assert_eq!(l.first_slave().on_complete_initialization(m, &message::CompleteInitialization {
			session: sid.into(),
			derived_point: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn fails_to_complete_initialization_from_non_master_node() {
		let (sid, _, _, mut l) = make_simple_cluster(0, 3).unwrap();
		l.take_and_process_message().unwrap();
		l.take_and_process_message().unwrap();
		l.take_and_process_message().unwrap();
		l.take_and_process_message().unwrap();
		assert_eq!(l.first_slave().on_complete_initialization(l.second_slave().node().clone(), &message::CompleteInitialization {
			session: sid.into(),
			derived_point: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidMessage);
	}

	#[test]
	fn fails_to_accept_keys_dissemination_if_not_waiting_for_it() {
		let (sid, _, s, l) = make_simple_cluster(0, 2).unwrap();
		assert_eq!(l.master().on_keys_dissemination(s, &message::KeysDissemination {
			session: sid.into(),
			secret1: math::generate_random_scalar().unwrap().into(),
			secret2: math::generate_random_scalar().unwrap().into(),
			publics: vec![math::generate_random_point().unwrap().into()],
		}).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn fails_to_accept_keys_dissemination_if_wrong_number_of_publics_passed() {
		let (sid, m, _, mut l) = make_simple_cluster(0, 3).unwrap();
		l.take_and_process_message().unwrap(); // m -> s1: InitializeSession
		l.take_and_process_message().unwrap(); // m -> s2: InitializeSession
		l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization
		l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization
		l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization
		l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization
		l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination
		assert_eq!(l.first_slave().on_keys_dissemination(m, &message::KeysDissemination {
			session: sid.into(),
			secret1: math::generate_random_scalar().unwrap().into(),
			secret2: math::generate_random_scalar().unwrap().into(),
			publics: vec![math::generate_random_point().unwrap().into(), math::generate_random_point().unwrap().into()],
		}).unwrap_err(), Error::InvalidMessage);
	}

	#[test]
	fn fails_to_accept_keys_dissemination_second_time_from_the_same_node() {
		let (sid, m, _, mut l) = make_simple_cluster(0, 3).unwrap();
		l.take_and_process_message().unwrap(); // m -> s1: InitializeSession
		l.take_and_process_message().unwrap(); // m -> s2: InitializeSession
		l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization
		l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization
		l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization
		l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization
		l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination
		assert_eq!(l.first_slave().on_keys_dissemination(m, &message::KeysDissemination {
			session: sid.into(),
			secret1: math::generate_random_scalar().unwrap().into(),
			secret2: math::generate_random_scalar().unwrap().into(),
			publics: vec![math::generate_random_point().unwrap().into()],
		}).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn should_not_accept_public_key_share_when_is_not_waiting_for_it() {
		let (sid, _, s, l) = make_simple_cluster(1, 3).unwrap();
		assert_eq!(l.master().on_public_key_share(s, &message::PublicKeyShare {
			session: sid.into(),
			public_share: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidStateForRequest);
	}

	#[test]
	fn should_not_accept_public_key_share_when_receiving_twice() {
		let (sid, m, _, mut l) = make_simple_cluster(0, 3).unwrap();
		l.take_and_process_message().unwrap(); // m -> s1: InitializeSession
		l.take_and_process_message().unwrap(); // m -> s2: InitializeSession
		l.take_and_process_message().unwrap(); // s1 -> m: ConfirmInitialization
		l.take_and_process_message().unwrap(); // s2 -> m: ConfirmInitialization
		l.take_and_process_message().unwrap(); // m -> s1: CompleteInitialization
		l.take_and_process_message().unwrap(); // m -> s2: CompleteInitialization
		l.take_and_process_message().unwrap(); // m -> s1: KeysDissemination
		l.take_and_process_message().unwrap(); // m -> s2: KeysDissemination
		l.take_and_process_message().unwrap(); // s1 -> m: KeysDissemination
		l.take_and_process_message().unwrap(); // s1 -> s2: KeysDissemination
		l.take_and_process_message().unwrap(); // s2 -> m: KeysDissemination
		l.take_and_process_message().unwrap(); // s2 -> s1: KeysDissemination
		let (f, t, msg) = match l.take_message() {
			Some((f, t, Message::Generation(GenerationMessage::PublicKeyShare(msg)))) => (f, t, msg),
			_ => panic!("unexpected"),
		};
		assert_eq!(&f, l.master().node());
		assert_eq!(&t, l.second_slave().node());
		l.process_message((f, t, Message::Generation(GenerationMessage::PublicKeyShare(msg.clone())))).unwrap();
		assert_eq!(l.second_slave().on_public_key_share(m, &message::PublicKeyShare {
			session: sid.into(),
			public_share: math::generate_random_point().unwrap().into(),
		}).unwrap_err(), Error::InvalidMessage);
	}


	#[test]
	fn encryption_fails_on_session_timeout() {
		let (_, _, _, l) = make_simple_cluster(0, 2).unwrap();
		assert!(l.master().joint_public_and_secret().is_none());
		l.master().on_session_timeout();
		assert!(l.master().joint_public_and_secret().unwrap().unwrap_err() == Error::NodeDisconnected);
	}

	#[test]
	fn encryption_fails_on_node_timeout() {
		let (_, _, _, l) = make_simple_cluster(0, 2).unwrap();
		assert!(l.master().joint_public_and_secret().is_none());
		l.master().on_node_timeout(l.first_slave().node());
		assert!(l.master().joint_public_and_secret().unwrap().unwrap_err() == Error::NodeDisconnected);
	}

	#[test]
	fn complete_enc_dec_session() {
		let test_cases = [(0, 5), (2, 5), (3, 5)];
		for &(threshold, num_nodes) in &test_cases {
			let mut l = MessageLoop::new(num_nodes);
			l.master().initialize(Public::default(), threshold, l.nodes.keys().cloned().collect()).unwrap();
			assert_eq!(l.nodes.len(), num_nodes);

			// let nodes do initialization + keys dissemination
			while let Some((from, to, message)) = l.take_message() {
				l.process_message((from, to, message)).unwrap();
			}

			// check that all nodes has finished joint public generation
			let joint_public_key = l.master().joint_public_and_secret().unwrap().unwrap().0;
			for node in l.nodes.values() {
				let state = node.session.state();
				assert_eq!(state, SessionState::Finished);
				assert_eq!(node.session.joint_public_and_secret().map(|p| p.map(|p| p.0)), Some(Ok(joint_public_key)));
			}

			// now let's encrypt some secret (which is a point on EC)
			let document_secret_plain = Random.generate().unwrap().public().clone();
			let all_nodes_id_numbers: Vec<_> = l.master().data.lock().nodes.values().map(|n| n.id_number.clone()).collect();
			let all_nodes_secret_shares: Vec<_> = l.nodes.values().map(|n| n.session.data.lock().secret_share.as_ref().unwrap().clone()).collect();
			let document_secret_decrypted = do_encryption_and_decryption(threshold, &joint_public_key,
				&all_nodes_id_numbers,
				&all_nodes_secret_shares,
				None,
				document_secret_plain.clone()
			).0;
			assert_eq!(document_secret_plain, document_secret_decrypted);
		}
	}

	#[test]
	fn encryption_session_works_over_network() {
		//::util::log::init_log();

		let test_cases = [(1, 3)];
		for &(threshold, num_nodes) in &test_cases {
			let mut core = Core::new().unwrap();

			// prepare cluster objects for each node
			let clusters = make_clusters(&core, 6022, num_nodes);
			run_clusters(&clusters);

			// establish connections
			loop_until(&mut core, time::Duration::from_millis(300), || clusters.iter().all(all_connections_established));

			// run session to completion
			let session_id = SessionId::default();
			let session = clusters[0].client().new_generation_session(session_id, Public::default(), threshold).unwrap();
			loop_until(&mut core, time::Duration::from_millis(1000), || session.joint_public_and_secret().is_some());
		}
	}
}
